热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

接收者|下图_java实现利用RabbitMQ发送和消费消息

篇首语:本文由编程笔记#小编为大家整理,主要介绍了java实现利用RabbitMQ发送和消费消息相关的知识,希望对你有一定的参考价值。

篇首语:本文由编程笔记#小编为大家整理,主要介绍了java 实现利用 RabbitMQ 发送和消费消息相关的知识,希望对你有一定的参考价值。




1.简介

     RabbitMQ 是一个消息代理。从本质上说,它从生产者接收消息,然后把这些消息传递给消费者。在这期间,它能根据你制定的规则发送,缓存,或者持久化存储这些消息。

RabbitMQ 使用到的专业术语。

1).Producing 的意思不仅仅是发送消息。发送消息的程序叫做producer。我们像下图一样描绘它。

2).Queue 是一个消息盒子的名称。它存活在 RabbitMQ 里。虽然消息流经 RabbitMQ 和你的应用程序,但是他们只能在 Queue 里才能被保存。Queue 没有任何边界的限制,你想存多少消息都可以,它本质上是一个无限的缓存。许多生产者都可以向一个 Queue 里发送消息,许多消费者都可以从一个 Queue 里接收消息。我们下图一样描绘它。


3).Consuming 的意思和接收类似。一个消费者主要是指等待接收消息的程序。我们下图一样描绘它。

   

注意:生产者,消费者和代理不一定非要在同一台机器上,在大多数应用中的确也是这样的。



2."Hello World"

在这部分的使用指南中,我们要用 java 写两个程序;一个是生产者,他发送一个消息,另一个是消费者,它接收消息,并且把消息打印出来。我们将会忽略一些Java API 的细节,而是将注意力主要放在我们将要做的这件事上,这件事就是发送一个 "Hello World" 消息。

在下面的图中,"P" 代表生产者,而 "C" 代表消费者。中间的就是一个 Queue,一个消息缓存区。 

  

java 客户端类库

AMQP 是一个开源的,通用的消息协议。有几个用不同语言编写的 AMQP 的客户端。我们将要使用 RabbitMQ 提供的 java 版本的客户端。

下载这个客户端 (http://www.rabbitmq.com/java-client.html) ,把它解压到你的工作目录下,并且找到jar文件。

$ unzip rabbitmq-java-client-bin-*.zip
$ cp rabbitmq-java-client-bin-*/*.jar ./

这个客户端在 Maven 的中心仓库中也有。

groupId:com.rabbitmq

artifactId:amqp-client

现在我们有了客户端和依赖,我们开始写代码。


Sending

 

我们把消息发送者叫 Send,消息接收者叫 Recv。消息发送者连接 RabbitMQ ,发送一个消息,然后退出。

创建 Send.java 文件,并导入我们需要的类。

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

创建给 Queue 命名

public class Send
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException
...

然后我们创建一个连接。

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

到这,我们就和本机的代理建立了连接。如果我们想要连接到不同机器上的代理,我们只需要制定那台机器的域名或 IP 地址即可。

接下来,我们创建一个管道。为了发送消息,我们还需要声明一个 Queue ,然后我们就能发布消息到 Queue 上了。

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

Queue 声明的过程是幂等的,只有在它不存在的情况下才会被创建出来。消息的内容是字节数组,这意味着你能够使用任何你想使用的编码。

最后,我们关闭这个管道和连接。

下面是我们完整的 Send.java 文件

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send
private final static String QUEUE_NAME = "hello";
  public static void main(String[] argv) throws Exception
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();
       



Receiving


不像消息的发送者只是发送一个消息,我们的接收者需要不断的监听消息,并把它们打印出来。



在 Recv.java 这个文件中需要导入的类和 Send.java 中类似。

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
DefaultConsumer 是一个实现了 Consumer 接口的类,我们将会用它来缓存从服务器推送过来的消息。


创建 Recv.java 文件,打开一个连接和一个管道,并且声明一个我们将要消费消息的 Queue。注意:这个 Queue 的名字要和 Send.java 文件中的一致。

public class Recv
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
...


注意到,我们在这又声明了一个 Queue,这是因为我们可能在运行 Send.java 文件之前运行 Recv.java 文件,所以我们要确保在从 Queue 消费消息之前,

它是已经存在的。

我们告诉服务器从 Queue 中向我们传递消息,由于它是以异步的方式向我们传递消息,所以我们采用从消息的缓存对象回调的方式,直到我们已经消费了这些消

息。这就是 DefaultConsumer 子类所做的事。

Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");

;
channel.basicConsume(QUEUE_NAME, true, consumer);

下面是完整的 Recv.java 文件。

import com.rabbitmq.client.*;
import java.io.IOException;

public class Recv
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    Consumer consumer = new DefaultConsumer(channel)
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
          throws IOException
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
     
    ;
    channel.basicConsume(QUEUE_NAME, true, consumer);



Putting it all together

编译文件:

$ javac -cp rabbitmq-client.jar Send.java Recv.java

运行时,我们需要 rabbitmq-client.jar,打开终端,运行 Send 文件:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send

然后运行 Recv 文件:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv

消息的接收者将会打印出发送者发送的消息,接收者将保持运行的状态,并且等待消息,使用 Ctrl+C 可以结束运行。因此,尽可能从另一个终端运行发送者程序。

原文地址:https://www.rabbitmq.com/tutorials/tutorial-one-java.html








推荐阅读
author-avatar
兰花m123_680
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有